PersonalizeのバッチレコメンドジョブをBoto3を使って試してみた
Personalizeはレコメンドモデルをソリューションという形で作成し、ソリューションをキャンペーンという形でホスティングすることでリアルタイムレコメンドを利用できます。キャンペーンによるリアルタイムレコメンド以外にも、S3に配置したユーザID一覧ファイル(JSON Lines/JSONL形式)に対して、レコメンドをまとめて作成することができるバッチレコメンド機能もあります。
バッチレコメンドの主なユースケースとしては、定期ジョブとしてLambdaやワークフローシステムなどで動かしたり、レコメンド内容の検証や評価のためにノートブック等から動かすことが考えられます。 今回は主に検証時にスクリプトからバッチレコメンドを行うことを想定して、Boto3を使ったバッチレコメンドの一通りの流れを試してみたいと思います。
今回は作成済みのHRNN(USER_PERSONALIZATIONタイプ)のソリューションバージョンを利用します。Personalizeの諸概念やソリューションの作成等については以下のエントリをご参照ください。
- Amazon Personalizeを使ってみた | Developers.IO
- Amazon Personalize のHRNN-Metadata レシピ触ってみた – 機械学習 on AWS Advent Calendar 2019 | Developers.IO
やってみる
MovieLens 100K Datasetを利用して作成したソリューション(ソリューションバージョン)を用いて、複数のユーザIDに対してバッチレコメンドを行い、そのレコメンド内容を確認するという流れで進めていきます。
準備
まずは使用するモジュールやライブラリを読み込み、入出力ファイルの保存場所等を定めます。
import pandas as pd import json from os import path import boto3 from datetime import datetime import time current_dt = datetime.now().strftime('%Y%m%d-%H%M%S') bucket_name = '<バケット名>' prefix = 'ml-100k' user_ids_json_s3_path = f'personalize-work/{prefix}/user_ids.jsonl' # 作成済みのソリューションバージョンのARN(レコメンドを作成するソリューションバージョン) solution_version_arn = 'arn:aws:personalize:ap-northeast-1:123456789:solution/DEMO-solution/12321063' # Personalizeがバッチレコメンド時に利用するIAMロール role_arn = 'arn:aws:iam::123456789:role/service-role/AmazonPersonalize-ExecutionRole' personalize = boto3.Session().client('personalize') bucket = boto3.Session().resource('s3').Bucket(bucket_name)
※バッチレコメンドの際に入力/出力ファイルの保存場所として利用するバケットはs3:GetObject
、s3:PutObject
、s3:ListBucket
操作をPersonalizeに許可してある必要があります。詳細については以下のドキュメントをご確認ください。
続いて、レコメンド情報の確認やレコメンド対象のユーザID取得のために、データ(MovieLens 100K Dataset)をダウンロードしてきます。
wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip unzip -o ml-100k.zip
ユーザによる映画の評価データ(インタラクションデータ)と映画のメタデータ(アイテムデータ)をデータフレームに読み込みます。
# ユーザによる映画の評価データ df = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP']) # 映画データ items = pd.read_csv('./ml-100k/u.item', sep='|', names=[ 'ITEM_ID', 'TITLE', 'RELEASE_DATE', 'VIDEO_RELEASE_DATE', 'IMDB_URL', 'UNKNOWN', 'ACTION', 'ADVENTURE', 'ANIMATION', "CHILDREN'S", 'COMEDY', 'CRIME', 'DOCUMENTARY', 'DRAMA', 'FANTASY', 'FILM-NOIR', 'HORROR', 'MUSICAL', 'MYSTERY', 'ROMANCE', 'SCI-FI', 'THRILLER', 'WAR', 'WESTERN' ], encoding='latin-1') items.set_index('ITEM_ID', inplace=True) # 映画のジャンルを分かりやすいように'|'で結合する def extract_genre(row): return '|'.join([i for i, v in row[5:].items() if v == 1 ]) items['GENRE'] = items.apply(extract_genre, axis=1) items = items[['TITLE', 'GENRE']]
インタラクションデータからユーザIDを取り出して、Personalizeのバッチレコメンドが対応しているJSONL形式に変換し、S3にアップロードします。
target_user_ids = list(df.USER_ID.unique()) user_ids = [json.dumps({'userId': str(user_id)}) for user_id in target_user_ids] bucket.Object(user_ids_json_s3_path).put(Body='\n'.join(user_ids))
バッチレコメンド
バッチレコメンドジョブを実行します。s3DataDestination
で指定するpath
はオブジェクトでないことを示すために、末尾に/
をつける必要があります。バッチレコメンドによって作成されるレコメンドデータは<s3DataDestinationで指定した場所><入力ファイル名>.out
に保存されます。
user_ids_json_s3_uri = f's3://{bucket_name}/{user_ids_json_s3_path}' response = personalize.create_batch_inference_job( jobName=f'{prefix}-{current_dt}', solutionVersionArn=solution_version_arn, numResults=100, # 作成するレコメンド数 jobInput={ 's3DataSource': { 'path': user_ids_json_s3_uri # 入力データの場所 } }, jobOutput={ 's3DataDestination': { 'path': path.join(path.dirname(user_ids_json_s3_uri), '') # レコメンドデータの出力場所 } }, roleArn=role_arn ) job_arn = response['batchInferenceJobArn']
ジョブが完了するまで待機します。
max_time = time.time() + 3600 while time.time() < max_time: response = personalize.describe_batch_inference_job( batchInferenceJobArn=job_arn ) status = response["batchInferenceJob"]["status"] print("DatasetGroup: {}".format(status)) if status == "ACTIVE" or status == "CREATE FAILED": break time.sleep(60)
ジョブの実行時間はモデルや実行タイミング、対象ユーザ数などいろいろな要因によって変動するものだと思います。参考までに、今回の943ユーザに対してそれぞれ100個のレコメンドを作成する場合だと、30分弱程度かかりました。
レコメンド内容の確認
S3に保存されたレコメンドデータをダウンロードし、扱いやすいように変換します。
def transform_recommendation(dic): return ( int(dic['input']['userId']), list(map(lambda x: int(x), dic['output']['recommendedItems'])) ) user_base_recommendation = bucket.Object(user_ids_json_s3_path + '.out').get()['Body'].read() user_base_recommendation = dict([transform_recommendation(json.loads(ss)) for ss in user_base_recommendation.splitlines()])
まずは対象ユーザのインタラクションデータを確認します。
def fetch_interaction(user_id): return df[df.USER_ID == user_id].join(items, on='ITEM_ID').sort_values('TIMESTAMP', ascending=False).set_index('ITEM_ID').loc[:, ['TITLE', 'GENRE', 'RATING', 'TIMESTAMP']] fetch_interaction(1)[:20]
レコメンドされた映画を確認してみます。
def fetch_recommendation(user_id): return items[items.index.isin(user_base_recommendation[user_id])] fetch_recommendation(1)[:20]
さいごに
Boto3を使ってPersonalizeのバッチレコメンドを行う流れについて紹介しました。用途によってはリアルタイムレコメンドに比べて、バッチレコメンドの方が楽な場合もあるかと思いますが、ジョブ実行時間のオーバーヘッドがそれなりにあります。それぞれの特徴を理解した上で活用していきたいですね。